extern crate alloc;

use alloc::format;
use alloc::string::String;
use embed_std::sync::arc::Arc;
use embed_std::thread::set_thread_name;
use embed_std::thread::sleep_ms;
use embed_std::{errorln, infoln};
use qbus::qbus::QBus;
use qbus::qconsumer::TOPIC_ALL;
use qbus::qdispatcher::QDispatcher;
use qbus::qdispatcher::QEventHandlerR;
use qbus::qmessage::INVALID_SEQ;

struct MsgHandler {}
impl QEventHandlerR for MsgHandler {
    fn handle(&mut self, event_id: u32, data: &[u8]) {
        println!("event id {}, data: {}", event_id, unsafe {
            std::str::from_utf8_unchecked(data)
        });
    }
}

fn test_bus() {
    let bus = QBus::new(200);
    let dispatcher = QDispatcher::new(bus.clone());
    dispatcher.start_dispatch().expect("start dispatch failed");
    let is_quit = Arc::new(core::sync::atomic::AtomicBool::new(false));
    for i in 0..10 {
        let is_quit_1 = is_quit.clone();
        let mut consumer = bus.create_consumer();
        consumer.subscribe(TOPIC_ALL);
        // consumer.subscribe("aa");
        // consumer.subscribe("bb");
        let dp = dispatcher.clone();
        let token = dp
            .add_listener_r(&[1, 2, 4], Box::new(MsgHandler {}))
            .expect("add failed");
        println!("token: {}", token);
        let _ = embed_std::thread::spawn(move || {
            set_thread_name(format!("test_bus_{}", i).as_str());
            let mut count = 0;
            while !is_quit_1.load(core::sync::atomic::Ordering::SeqCst) {
                if false {
                    let mut seq = INVALID_SEQ;
                    let mut resp = String::new();
                    if consumer.read_map(|msg| {
                        count += 1;
                        seq = msg.req_id;
                        let pl = unsafe { core::str::from_utf8_unchecked(msg.payload.as_slice()) };
                        resp = format!(
                            "[{}] seq {} topic {}: payload {}",
                            i, msg.req_id, msg.topic, pl
                        );
                        infoln!(
                            "[{}] got msg id: {}, topic: {}, payload: {} consumers {}, total {}",
                            i,
                            msg.id,
                            msg.topic,
                            pl,
                            msg.consume_count,
                            count
                        );
                    }) {
                        if seq != INVALID_SEQ {
                            consumer.send_resp(seq, Vec::from(resp));
                        }
                    } else {
                        sleep_ms(1);
                    }
                } else {
                    if let Some(msg) = consumer.read() {
                        count += 1;
                        let pl = unsafe { String::from_utf8_unchecked(msg.payload) };
                        infoln!(
                        "[{}] got msg id: {}, topic: {}, payload: {}, consume_count {}, total {}",
                        i,
                        msg.id,
                        msg.topic,
                        pl,
                            msg.consume_count,
                        count
                    );
                        if msg.req_id != INVALID_SEQ {
                            let resp =
                                format!("seq {} topic {}: payload {}", msg.req_id, msg.topic, pl);
                            consumer.send_resp(msg.req_id, Vec::from(resp));
                        }
                    } else {
                        sleep_ms(1);
                    }
                }
            }
            println!("remove token: {}", token);
            dp.remove_listener_r(token);
        });
    }
    for i in 0..1000 {
        let pl = format!("{} aa", i);
        match bus.call("aa", pl.as_bytes(), 30) {
            Ok(data) => {
                let v = unsafe { String::from_utf8_unchecked(data) };
                infoln!("resp => aa: {}", v);
            }
            Err(err) => {
                errorln!("{} failed: {}", i, err);
            }
        }
        let _ = bus.publish_vec("bb", &[format!("{} bb", i).as_bytes()]);
        let _ = bus.publish("cc", format!("{} cc", i).as_bytes());

        dispatcher
            .publish(1, format!("{}: event 1", i).as_bytes())
            .expect("fail");
        dispatcher
            .publish(4, format!("{}: event 4", i).as_bytes())
            .expect("fail");
        dispatcher
            .publish(2, format!("{}: event 2", i).as_bytes())
            .expect("fail");
        if i % 50 == 0 {
            sleep_ms(50);
        }
        // if i > 10 {
        //     break;
        // }
    }
    sleep_ms(3000);
    is_quit.store(true, core::sync::atomic::Ordering::SeqCst);
    sleep_ms(3000);
    // dispatcher.stop_dispatch();
    // sleep_ms(3000);
}

fn main() {
    test_bus();
    sleep_ms(2000);
}
